package raft

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
//   start agreement on a new log entry
// rf.GetState() (term, isLeader)
//   ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

import (
	//	"bytes"
	"sync"
	"sync/atomic"
	"time"

	//	"6.824/labgob"
	"6.824/labrpc"
)

// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in part 2D you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh, but set CommandValid to false for these
// other uses.
type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int

	// For 2D:
	SnapshotValid bool
	Snapshot      []byte
	SnapshotTerm  int
	SnapshotIndex int
}

// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()
	//2A
	term             int
	votedFor         int
	peersVoteGranted []bool
	role             int
	heartBeatTimeOut time.Time

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

}

const (
	follower  = 0
	candidate = 1
	leader    = 2

	electionTimeOutMin = 300
	electionTimeOutMax = 500
)

type AppendEntriesArgs struct {
	Term         int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []byte //保存的日志条目
	LeaderCommit int    //提交的日志
}

type AppendEntriesReply struct {
	Term    int
	Success bool
}

func (rf *Raft) isHeartBeatTimeOut() bool {
	return rf.heartBeatTimeOut.Before(time.Now())
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
	// Your code here (2A).
	return rf.term, rf.role == leader
}

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
	// Your code here (2C).
	// Example:
	// w := new(bytes.Buffer)
	// e := labgob.NewEncoder(w)
	// e.Encode(rf.xxx)
	// e.Encode(rf.yyy)
	// data := w.Bytes()
	// rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	// Your code here (2C).
	// Example:
	// r := bytes.NewBuffer(data)
	// d := labgob.NewDecoder(r)
	// var xxx
	// var yyy
	// if d.Decode(&xxx) != nil ||
	//    d.Decode(&yyy) != nil {
	//   error...
	// } else {
	//   rf.xxx = xxx
	//   rf.yyy = yyy
	// }
}

// A service wants to switch to snapshot.  Only do so if Raft hasn't
// have more recent info since it communicate the snapshot on applyCh.
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {

	// Your code here (2D).

	return true
}

// the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
	// Your code here (2D).

}

// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct { //candidate的请求投票
	Term         int
	Candidate_id int //让投票人能够知道给谁投票
	LastLogIndex int
	LastLogTerm  int
	// Your data here (2A, 2B).
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
	// Your data here (2A).
	Term        int
	VoteGranted bool //是否投票给这个candidate
}

func (rf *Raft) increaseTerm(term int) {
	rf.term = term
	rf.votedFor = -1 //任期被改变的时候会被清空
	rf.role = follower
	rf.persist()
}

//
// example RequestVote RPC handler.
//follower给candidate投票

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term > rf.term {
		rf.increaseTerm(args.Term)
	}
	reply.Term = rf.term // 回复的是经过更新过后的任期数 或是更大的任期数
	//判断是否要投票给args
	if args.Term >= rf.term && rf.votedFor == -1 {
		reply.VoteGranted = true
		rf.heartBeatTimeOut = time.Now().Add(getRandElectionTimeOut())
		rf.votedFor = args.Candidate_id
		rf.persist()
	}

}

// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return.  Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok
}

func (rf *Raft) getGrantedVotes() int {
	count := 0
	for node := 0; node < len(rf.peers); node++ {
		if rf.peersVoteGranted[node] {
			count++
		}
	}
	return count
}

func (rf *Raft) collectVotes() {

	askVote := func(server int, args *RequestVoteArgs) {
		reply := &RequestVoteReply{}
		ok := rf.sendRequestVote(server, args, reply)
		rf.mu.Lock()
		if reply.Term > rf.term {
			rf.increaseTerm(reply.Term)
		}
		rf.peersVoteGranted[server] = ok && reply.VoteGranted && args.Term == rf.term
		rf.mu.Unlock()
	}

	rf.mu.Lock()
	args := &RequestVoteArgs{
		Term:         rf.term,
		Candidate_id: rf.me,
	}
	rf.mu.Unlock()
	for node := 0; node < len(rf.peers); node++ {
		if node != rf.me {
			go askVote(node, args)
		}
	}

}

func (rf *Raft) startElection() {
	rf.mu.Lock()
	rf.increaseTerm(rf.term + 1)
	rf.votedFor = rf.me
	rf.role = candidate
	rf.heartBeatTimeOut = time.Now().Add(getRandElectionTimeOut())
	//初始化获得票数的list[]，刚开始只有自己给自己投票
	for node := 0; node < len(rf.peers); node++ {
		rf.peersVoteGranted[node] = node == rf.me
	}
	rf.persist()

	rf.mu.Unlock()

	go rf.collectVotes()

	for !rf.killed() {
		rf.mu.Lock()
		cnt := rf.getGrantedVotes()
		if rf.role == follower {
			rf.mu.Unlock()
			return
		} else if cnt > len(rf.peers)/2 {
			rf.role = leader
			go rf.sentHeartBeatsToAll(rf.term)
			rf.mu.Unlock()
			return
		} else if rf.isHeartBeatTimeOut() {
			rf.mu.Unlock()
			rf.startElection()
			return
		}
		rf.mu.Unlock()
		time.Sleep(25 * time.Millisecond) //作用？ 是否可以优化

	}

}
func (rf *Raft) sentHeartBeatsToAll(startTerm int) {

	sentHeartBeats := func(server int, args *AppendEntriesArgs) {
		reply := &AppendEntriesReply{}
		rf.peers[server].Call("Raft.AppendEntries", args, reply)
		rf.mu.Lock()
		defer rf.mu.Unlock()
		if reply.Term > rf.term {
			rf.increaseTerm(reply.Term) //有server的任期比自己高，回到follower
		}
	}

	if !rf.killed() {
		rf.mu.Lock()
		if rf.term != startTerm { //由于————————原因，现在不再是leader了，停止发送心跳
			rf.mu.Unlock()
			return
		}
		args := &AppendEntriesArgs{
			Term: rf.term,
		}
		rf.mu.Unlock()
		for server := 0; server < len(rf.peers); server++ {
			if server != rf.me {
				go sentHeartBeats(server, args)
			}
		}
		time.Sleep(130 * time.Millisecond)

	}

}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term < rf.term {
		reply.Term = rf.term
		reply.Success = false
		return
	}
	reply.Term = rf.term
	reply.Success = true
	return
}

// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1
	isLeader := true

	// Your code here (2B).

	return index, term, isLeader
}

// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
	atomic.StoreInt32(&rf.dead, 1)
	// Your code here, if desired.
}

func (rf *Raft) killed() bool {
	z := atomic.LoadInt32(&rf.dead)
	return z == 1
}

// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) ticker() {
	for rf.killed() == false {
		// Your code here to check if a leader election should
		// be started and to randomize sleeping time using
		// time.Sleep().
		rf.mu.Lock()
		if rf.isHeartBeatTimeOut() && rf.role == follower {
			rf.mu.Unlock()
			rf.startElection()
		} else {
			rf.mu.Unlock()
		}
		time.Sleep(60 * time.Millisecond)

	}
}

// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
	rf.votedFor = -1
	rf.peersVoteGranted = make([]bool, len(peers))
	rf.role = follower
	rf.heartBeatTimeOut = time.Now().Add(getRandElectionTimeOut())

	// Your initialization code here (2A, 2B, 2C).

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())

	// start ticker goroutine to start elections
	go rf.ticker()

	return rf
}
